package com.dahuan.tables;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Table_FileSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        //读取路径
        tableEnv.connect( new FileSystem().path( path ) )
                //指定定义如何从连接器读取数据的格式。
                .withFormat( new Csv() )
                //指定结果表架构。
                .withSchema( new Schema()
                        .field( "id", DataTypes.STRING() )
                        .field( "timestamp", DataTypes.BIGINT() )
                        .field( "temp", DataTypes.DOUBLE() )
                //TODO 在给定路径中注册由基础属性描述的表。
                ).createTemporaryTable( "inputTable" );

        //TODO 读取已经注册的表并生成Table对象
        Table inputTable = tableEnv.from( "inputTable" );
        tableEnv.toAppendStream( inputTable, Row.class ).print("inputTable");
        //TODO 以树格式将此表的模式打印到控制台。
        inputTable.printSchema();
        env.execute( "Table_FileSource" );
    }
}
